// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include "cloud/cloud_rowset_writer.h"

#include "io/cache/block_file_cache_factory.h"
#include "io/fs/file_system.h"
#include "olap/rowset/rowset_factory.h"

namespace doris {

CloudRowsetWriter::CloudRowsetWriter() = default;

CloudRowsetWriter::~CloudRowsetWriter() = default;

Status CloudRowsetWriter::init(const RowsetWriterContext& rowset_writer_context) {
    _context = rowset_writer_context;
    _rowset_meta = std::make_shared<RowsetMeta>();

    if (_context.is_local_rowset()) {
        // In cloud mode, this branch implies it is an intermediate rowset for external merge sort,
        // we use `global_local_filesystem` to write data to `tmp_file_dir`(see `local_segment_path`).
        _context.tablet_path = io::FileCacheFactory::instance()->get_cache_path();
    } else {
        _rowset_meta->set_remote_storage_resource(*_context.storage_resource);
    }

    _rowset_meta->set_rowset_id(_context.rowset_id);
    _rowset_meta->set_partition_id(_context.partition_id);
    _rowset_meta->set_tablet_id(_context.tablet_id);
    _rowset_meta->set_index_id(_context.index_id);
    _rowset_meta->set_tablet_schema_hash(_context.tablet_schema_hash);
    _rowset_meta->set_rowset_type(_context.rowset_type);
    _rowset_meta->set_rowset_state(_context.rowset_state);
    _rowset_meta->set_segments_overlap(_context.segments_overlap);
    _rowset_meta->set_txn_id(_context.txn_id);
    _rowset_meta->set_txn_expiration(_context.txn_expiration);
    _rowset_meta->set_compaction_level(_context.compaction_level);
    if (_context.rowset_state == PREPARED || _context.rowset_state == COMMITTED) {
        _is_pending = true;
        _rowset_meta->set_load_id(_context.load_id);
    } else {
        // Rowset generated by compaction or schema change
        _rowset_meta->set_version(_context.version);
        DCHECK_NE(_context.newest_write_timestamp, -1);
        _rowset_meta->set_newest_write_timestamp(_context.newest_write_timestamp);
    }
    _rowset_meta->set_tablet_schema(_context.tablet_schema);
    _context.segment_collector = std::make_shared<SegmentCollectorT<BaseBetaRowsetWriter>>(this);
    _context.file_writer_creator = std::make_shared<FileWriterCreatorT<BaseBetaRowsetWriter>>(this);
    return Status::OK();
}

Status CloudRowsetWriter::build(RowsetSharedPtr& rowset) {
    RETURN_IF_ERROR(_close_file_writers());

    // TODO(plat1ko): check_segment_footer

    RETURN_IF_ERROR(_build_rowset_meta(_rowset_meta.get()));
    // If the current load is a partial update, new segments may be appended to the tmp rowset after the tmp rowset
    // has been committed if conflicts occur due to concurrent partial updates. However, when the recycler do recycling,
    // it will generate the paths for the segments to be recycled on the object storage based on the number of segments
    // in the rowset meta. If these newly added segments are written to the object storage and the transaction is aborted
    // due to a failure before successfully updating the rowset meta of the corresponding tmp rowset, these newly added
    // segments cannot be recycled by the recycler on the object storage. Therefore, we need a new state `BEGIN_PARTIAL_UPDATE`
    // to indicate that the recycler should use list+delete to recycle segments. After the tmp rowset's rowset meta being
    // updated successfully, the `rowset_state` will be set to `COMMITTED` and the recycler can do recycling based on the
    // number of segments in the rowset meta safely.
    //
    // rowset_state's FSM:
    //
    //                      transfer 0
    //       PREPARED ---------------------------> COMMITTED
    //                 |                               ^
    //                 | transfer 1                    |
    //                 |                               | transfer 2
    //                 |--> BEGIN_PARTIAL_UPDATE ------|
    //
    // transfer 0 (PREPARED -> COMMITTED): finish writing a rowset and the rowset' meta will not be changed
    // transfer 1 (PREPARED -> BEGIN_PARTIAL_UPDATE): finish writing a rowset, but may append new segments later and the rowset's meta may be changed
    // transfer 2 (BEGIN_PARTIAL_UPDATE -> VISIBLE): finish adding new segments and the rowset' meta will not be changed, the rowset is visible to users
    if (_context.partial_update_info && _context.partial_update_info->is_partial_update) {
        _rowset_meta->set_rowset_state(BEGIN_PARTIAL_UPDATE);
    } else {
        _rowset_meta->set_rowset_state(COMMITTED);
    }

    // update rowset meta tablet schema if tablet schema updated
    auto rowset_schema = _context.merged_tablet_schema != nullptr ? _context.merged_tablet_schema
                                                                  : _context.tablet_schema;
    _rowset_meta->set_tablet_schema(rowset_schema);

    if (_rowset_meta->newest_write_timestamp() == -1) {
        _rowset_meta->set_newest_write_timestamp(UnixSeconds());
    }

    if (auto seg_file_size = _seg_files.segments_file_size(_segment_start_id);
        !seg_file_size.has_value()) [[unlikely]] {
        LOG(ERROR) << "expected segment file sizes, but none presents: " << seg_file_size.error();
    } else {
        _rowset_meta->add_segments_file_size(seg_file_size.value());
    }

    RETURN_NOT_OK_STATUS_WITH_WARN(RowsetFactory::create_rowset(rowset_schema, _context.tablet_path,
                                                                _rowset_meta, &rowset),
                                   "rowset init failed when build new rowset");
    _already_built = true;
    return Status::OK();
}

} // namespace doris
